# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Abstract interfaces for physical domains description.
* :class:`~hysop.domain.domain.Domain`
* :class:`~hysop.domain.domain.DomainView`
"""
import hashlib
import numpy as np
from abc import ABCMeta, abstractmethod
from hysop.constants import HYSOP_DEFAULT_TASK_ID, HYSOP_DIM, HYSOP_INTEGER
from hysop.core.mpi import main_comm, MPI
from hysop.tools.parameters import MPIParams
from hysop.tools.decorators import debug
from hysop.tools.handle import RegisteredObject, TaggedObjectView
from hysop.tools.htypes import check_instance
from hysop.tools.numpywrappers import npw
from hysop.symbolic.frame import SymbolicFrame
[docs]
class DomainView(TaggedObjectView, metaclass=ABCMeta):
"""Abstract base class for views on domains."""
__slots__ = ("_domain", "_topology_state")
@debug
def __init__(self, topology_state, domain=None, **kwds):
super().__init__(obj_view=domain, **kwds)
[docs]
@debug
def __new__(cls, topology_state, domain=None, **kwds):
"""Create and initialize a DomainView."""
from hysop.topology.topology import TopologyState
check_instance(topology_state, TopologyState)
check_instance(domain, Domain, allow_none=True)
obj = super().__new__(cls, obj_view=domain, **kwds)
domain = domain or obj
check_instance(domain, Domain)
obj._domain = domain
obj._topology_state = topology_state
return obj
def _get_domain(self):
"""Return the domain on which the view is on."""
return self._domain
def _get_topology_state(self):
"""Return the topology state altering this domain view."""
return self._topology_state
def _get_dim(self):
"""Return the dimension of the domain."""
return self._domain._dim
def _get_parent_comm(self):
"""Return the parent communicator used to create this domain."""
return self._domain._parent_comm
def _get_parent_rank(self):
"""Return the rank of the process in the parent communicator."""
return self._domain._parent_rank
[docs]
def task_intercomm(self, task_id):
"""
Return the intercommunicator that owns the current process with the other task given.
"""
return self._domain._task_intercomm[task_id]
def _get_has_tasks(self):
"""Return if the domains contains 2 tasks or more."""
return self._domain._has_tasks
def _get_all_tasks(self):
"""Return all task id."""
return self._domain._all_tasks
[docs]
def task_root_in_parent(self, task_id):
"""Return the rank of the root process in the parent communicator"""
return self._domain._task_root_in_parent[task_id]
def _get_machine_comm(self):
"""
Return the communicator that owns the current process.
This is the sub-communicator which has been obtained by splitting.
the parent communicator by machine name.
"""
return self._domain._machine_comm
def _get_machine_rank(self):
"""Return the rank of the process in the machine communicator."""
return self._domain._machine_rank
def _get_proc_tasks(self):
"""Return mapping between mpi process rank and task identifier."""
return self._domain._proc_tasks
def _get_registered_topologies(self):
"""
Return the dictionary of all topologies already built on this domain,
with topology ids as keys and :class:`~hysop.topology.topology.Topology` as values.
"""
return self._domain._registered_topologies
def _get_frame(self):
"""Get symbolic frame associated to this domain."""
return self._domain._frame
[docs]
def task_on_proc(self, parent_rank):
"""Get task identifier for a given mpi process (parent communicator rank)."""
if parent_rank >= len(self._domain._proc_tasks):
msg = f"Unknown rank {parent_rank} in parent communicator."
raise ValueError(msg)
return self._domain._proc_tasks[parent_rank]
[docs]
def current_task(self):
"""Get task number of the current mpi process.
Return always the first task in case of multi-tasks"""
t = self.task_on_proc(self._domain._parent_rank)
try:
return t[0]
except IndexError:
return t
[docs]
def current_task_list(self):
"""Get task number of the current mpi process.
Return always a tuple ot taks id"""
t = self.task_on_proc(self._domain._parent_rank)
if isinstance(t, list) or isinstance(t, tuple) or isinstance(t, np.ndarray):
return t
else:
return [
t,
]
[docs]
def get_task_comm(self, task_id=None):
"""
Return the communicator that owns the current process.
This is the sub-communicator which has been obtained by splitting.
the parent communicator by colors (proc_tasks).
"""
if task_id is None:
task_id = self.current_task()
if task_id in self._domain._task_comm:
return self._domain._task_comm[task_id]
return None
def _get_task_comm(self):
return self.get_task_comm()
[docs]
def task_rank(self, task_id=None):
"""Return the rank of the process in the task communicator."""
if task_id is None:
task_id = self.current_task()
if task_id in self._domain._task_rank:
return self._domain._task_rank[task_id]
return None
def _is_task_matters(self, tid, proctasks):
return self._domain._is_task_matters(tid, proctasks)
[docs]
def is_on_task(self, params):
"""Test if the current process corresponds to param task."""
if isinstance(params, MPIParams):
task_id = params.task_id
elif isinstance(params, (int, npw.integer)):
task_id = params
else:
msg = "Could not extract task_id from type {}."
msg = msg.format(type(params))
raise TypeError(msg)
return self._is_task_matters(
task_id, self.task_on_proc(self._domain._parent_rank)
)
[docs]
def print_topologies(self):
"""Print all topologies registered on the domain."""
print(self.short_description() + " defined the following topologies:")
for topo in self._domain._registered_topologies.values():
print(" *" + topo.short_description())
[docs]
@abstractmethod
def short_description(self):
"""Return a short description of this domain as a string."""
pass
[docs]
@abstractmethod
def long_description(self):
"""Return a long description of this domain as a string."""
pass
def __eq__(self, other):
if not isinstance(other, DomainView):
return NotImplemented
eq = self._domain is other._domain
eq &= self._topology_state == other._topology_state
return eq
def __ne__(self, other):
if not isinstance(other, DomainView):
return NotImplemented
eq = self._domain is other._domain
eq &= self._topology_state == other._topology_state
return not eq
def __hash__(self):
return id(self._domain) ^ hash(self._topology_state)
[docs]
def __str__(self):
"""Equivalent to self.long_description()"""
return self.long_description()
[docs]
def tasks_overlapping(self, ta, tb):
return self._domain._overlapping_map[ta][tb]
domain = property(_get_domain)
dim = property(_get_dim)
proc_tasks = property(_get_proc_tasks)
parent_comm = property(_get_parent_comm)
task_comm = property(_get_task_comm)
parent_rank = property(_get_parent_rank)
has_tasks = property(_get_has_tasks)
all_tasks = property(_get_all_tasks)
machine_comm = property(_get_machine_comm)
machine_rank = property(_get_machine_rank)
registered_topologies = property(_get_registered_topologies)
frame = property(_get_frame)
[docs]
class Domain(RegisteredObject, metaclass=ABCMeta):
"""Abstract base class for the description of physical domains."""
@debug
def __init__(self, dim, parent_comm=None, proc_tasks=None, **kwds):
super().__init__(
dim=dim,
parent_comm=parent_comm,
proc_tasks=proc_tasks,
tag_prefix="d",
**kwds,
)
[docs]
@debug
def __new__(cls, dim, parent_comm=None, proc_tasks=None, **kwds):
"""
Create or get an existing physical domain of given dim on a specified MPI
communicator and specific tasks.
Parameters
----------
dim : integer, optional
dim of the domain.
parent_comm : MPI.Intracomm, optional
Parent communicator which may be split.
If not given this will be hysop.core.mpi.main_comm.
proc_tasks : tuple of ints or tuples of int, optional
Mapping between mpi process rank and task identifier.
If not given all procs will be on task HYSOP_DEFAULT_TASK_ID.
Attributes
----------
dim : int
Dimension of the domain.
proc_tasks : tuple of ints or tuples of int
Mapping between mpi process rank and task identifier.
parent_comm: MPI.Intracomm
Return the parent communicator used to create this domain.
parent_rank: int
Return the rank of the process in the parent communicator.
task_comm : MPI.IntraComm
Return the communicator that owns the current process.
This is the sub-communicator which has been obtained by splitting
the parent communicator by colors (proc_tasks).
task_rank: int
Return the rank of the process in the task communicator.
registered_topologies : dict
Dictionary of all topologies already built on this domain
with topology ids as keys and :class:`~hysop.topology.topology.Topology` as values.
Notes
-----
*Parent communicator is split/subgrouped according to proc_tasks.
*About MPI Tasks
proc_tasks[n] = 12 means that task 12 owns proc n
or equivalently that proc n is dedicated to task 12.
proc_tasks[n] = (12, 13) means that proc n is dedicated to both tasks 12 and 13.
*Examples of supported mapping:
- None or [1,1,1,1] : Single task (nothing more to do)
- [1,2,2,2] : disjoint tasks (two task_comm created by Comm_Split and one intercommunicator for each other task)
- [(1,2), (1,2), (2,), (2,)] : nested tasks (use the largest task intracommunicator as inter-task communication)
- [(1,), (1,2), (2,), (2,)] : non zero intersection (Not handled yet)
- [(1,2), (1,), (2,), (2,)] : non zero intersection with same leader (Not handled yet)
*A dupped parent_comm will return another idenpendent domain instance,
because MPI communicators are hashed trough their python object id.
"""
dim = int(dim)
parent_comm = parent_comm or main_comm
check_instance(proc_tasks, tuple, values=(int, tuple, list), allow_none=True)
proc_tasks = proc_tasks or [(HYSOP_DEFAULT_TASK_ID,)] * parent_comm.Get_size()
assert len(proc_tasks) == parent_comm.Get_size(), f"{proc_tasks}"
assert all([type(_) is type(proc_tasks[0]) for _ in proc_tasks])
# Sort tasks and flatten if single task per proc.
try:
# Tasks are sorted on each proc according to task size
def s_proc_tasks(pt):
return tuple(
sorted(
pt, key=lambda t: sum(t in _ for _ in proc_tasks), reverse=True
)
)
proc_tasks = npw.asarray(
[
npw.asarray(s_proc_tasks(pt), dtype=HYSOP_INTEGER)
for pt in proc_tasks
],
dtype=object,
)
if all([len(_) == 1 for _ in proc_tasks]):
proc_tasks = npw.asarray(
[_[0] for _ in proc_tasks], dtype=HYSOP_INTEGER
)
except TypeError:
assert type(proc_tasks[0]) is int
proc_tasks = npw.asarray(proc_tasks, dtype=HYSOP_INTEGER)
npw.set_readonly(proc_tasks)
# double check types, to be sure RegisteredObject will work as expected
check_instance(dim, int)
check_instance(parent_comm, MPI.Intracomm)
obj = super().__new__(
cls,
dim=dim,
parent_comm=parent_comm,
proc_tasks=proc_tasks,
tag_prefix="d",
**kwds,
)
if not obj.obj_initialized:
obj.__initialize(dim, parent_comm, proc_tasks)
return obj
@debug
def __initialize(self, dim, parent_comm, proc_tasks):
parent_rank = parent_comm.Get_rank()
parent_size = parent_comm.Get_size()
# is_task_matters : return if the given task matters on given proctask item
try:
# if proc taks contains iterable instead of ints
all_tasks = {t for _ in proc_tasks for t in _}
# Check for nested tasks: 1 taks is containing all procs
msg = "Non nested tasks are not handled yet (given proc tasks : {})".format(
proc_tasks
)
assert any([all([t in _ for _ in proc_tasks]) for t in all_tasks]), msg
# Check for all tasks sharing the same root process
all_tasks_roots = [
next(i for i, _ in enumerate(proc_tasks) if t in _) for t in all_tasks
]
msg = "Nested tasks are not sharing the same root (given proc tasks : {}, root index : {})".format(
proc_tasks, dict(zip(all_tasks, all_tasks_roots))
)
assert all(next(iter(all_tasks_roots)) == _ for _ in all_tasks_roots), msg
def is_task_matters(t, pt):
return t in pt
except TypeError:
# proc tasks must contains integers as taskid
all_tasks = {t for t in proc_tasks}
def is_task_matters(t, pt):
return t == pt
assert (
len(all_tasks) <= 2
), "Tasks intercommunicator has not been tested with 3 tasks or nore"
if len(all_tasks) == 1:
task_comm = {next(iter(all_tasks)): parent_comm.Dup()}
else:
assert len(proc_tasks) == parent_size
if all([isinstance(_, HYSOP_INTEGER) for _ in proc_tasks]):
# Single task per proc : need comm split
task_comm = {
proc_tasks[parent_rank]: parent_comm.Split(
color=proc_tasks[parent_rank], key=parent_rank
)
}
else:
# Multiple tasks per proc : need MPI groups
parent_group = parent_comm.Get_group()
ranks_tasks = {
t: [i for i, _ in enumerate(proc_tasks) if t in _]
for t in all_tasks
}
task_comm = {
_: parent_comm.Create_group(parent_group.Incl(ranks_tasks[_]))
for _ in all_tasks
}
# Remove null communicators
task_comm = {t: c for t, c in task_comm.items() if c != MPI.COMM_NULL}
# local ranks in tasks
task_rank = {t: c.Get_rank() for t, c in task_comm.items()}
# Build the root rank of each tasks on all process
task_root_in_parent = {}
all_task_ranks = parent_comm.allgather(task_rank)
for t in all_tasks:
for i, r in enumerate(all_task_ranks):
# if current rank is involved in task t
# and task rank is 0 then i is root
if is_task_matters(t, proc_tasks[i]) and 0 in r.values():
task_root_in_parent[t] = i
# Create intercommunicators from current task to others
task_intercomm = {}
# task overlapping map : gives the largest task of two overlapping tasks
overlapping_map = {
_: {__: None for __ in all_tasks if _ != __} for _ in all_tasks
}
# For all tasks the current rank is involved in
my_tasks = tuple(
_ for _ in all_tasks if is_task_matters(_, proc_tasks[parent_rank])
)
for tsource in my_tasks:
for tdest in (_ for _ in all_tasks if _ != tsource):
remote_leader = MPI.PROC_NULL
if is_task_matters(tdest, proc_tasks[parent_rank]):
remote_leader = task_root_in_parent[tsource]
if is_task_matters(tsource, proc_tasks[parent_rank]):
remote_leader = task_root_in_parent[tdest]
intercomm = None
if remote_leader != task_root_in_parent[tsource]:
# Disjoint tasks
intercomm = task_comm[tsource].Create_intercomm(
0, parent_comm, remote_leader
)
else:
if any([all([t in _ for _ in proc_tasks]) for t in all_tasks]):
# TODO: review if nested tasks with differents ranks
# for the moment : ensure ranks are identical throw all local tasks
# assert all([all([t.values()[0] == _ for _ in t.values()]) for t in all_task_ranks]), all_task_ranks
# If nested tasks, we use the largest task communicator
largest_task = [
t for t in all_tasks if all([t in _ for _ in proc_tasks])
][0]
intercomm = task_comm[largest_task]
overlapping_map[tsource][tdest] = largest_task
overlapping_map[tdest][tsource] = largest_task
else:
raise NotImplementedError()
task_intercomm[tdest] = intercomm
# Build a per-machine communicator in order to get a rank on local machines
# Split accoring to machine name hashed and converted to integer (strings generally differs only from a single character)
machine_comm = parent_comm.Split(
color=np.int32(
int(
hashlib.md5(MPI.Get_processor_name().encode("utf-8")).hexdigest(),
16,
)
% np.iinfo(np.int32).max
),
key=parent_rank,
)
machine_rank = machine_comm.Get_rank()
self._dim = dim
self._parent_comm = parent_comm
self._parent_rank = parent_rank
self._is_task_matters = is_task_matters
self._task_rank = task_rank
self._task_comm = task_comm
self._all_tasks = all_tasks
self._task_root_in_parent = task_root_in_parent
self._task_intercomm = task_intercomm
self._has_tasks = len(all_tasks) > 1
self._machine_comm = machine_comm
self._machine_rank = machine_rank
self._proc_tasks = proc_tasks
self._registered_topologies = {}
self._frame = SymbolicFrame(dim=dim)
self._overlapping_map = overlapping_map
[docs]
def register_topology(self, topo):
"""Register a new topology on this domain.
Do nothing if an equivalent topology is already
in the list.
"""
from hysop.topology.topology import Topology
check_instance(topo, Topology)
topo_id = topo.id
if topo_id in self._registered_topologies:
assert topo is self._registered_topologies[topo_id]
else:
self._registered_topologies[topo_id] = topo
[docs]
def remove_topology(self, topo):
"""
Remove a topology from the list of this domain.
Do nothing if the topology does not exist in the list.
"""
from hysop.topology.topology import Topology
check_instance(topo, Topology)
topo_id = topo.id
if topo_id in self._registered_topologies:
self._registered_topologies.pop(topo_id)
else:
topo_id = -1
return topo_id
[docs]
@abstractmethod
def view(self, topology_state):
"""Return a view of this domain altered by some topology_state."""
pass